Skip to content

refactor(submitter): concurrent submitter#3287

Open
julienrbrt wants to merge 2 commits intomainfrom
julien/speedup-submitter
Open

refactor(submitter): concurrent submitter#3287
julienrbrt wants to merge 2 commits intomainfrom
julien/speedup-submitter

Conversation

@julienrbrt
Copy link
Copy Markdown
Member

@julienrbrt julienrbrt commented Apr 24, 2026

Overview

Attempt to improve submitter by doing concurrent sends. We don't need to wait for the answer and we would submit.
Useful when the throughput of blobs needs to be high.

Related to #3244 as Fiber takes time to return, so it is necessary there. This PR is mainly to investigate if we can generalize that improvement to mainline evnode.

Summary by CodeRabbit

  • New Features

    • Submissions are now asynchronous with success/error callbacks and a Close method for clean shutdown.
  • Refactor

    • Submission flow: improved retry/backoff, batch-size limiting, and reporting.
    • Pending-item tracking reworked to claim contiguous ranges, track in-flight ranges, and expose total vs available counts.
  • Bug Fixes

    • Failed ranges are re-exposed for retry and pending items are no longer lost on error.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 24, 2026

📝 Walkthrough

Walkthrough

Adds cache APIs to reset and query total pending ranges; rewrites pending selection to track contiguous in-flight claims and gaps; converts DA submission to asynchronous, callback-driven submission with centralized retry/backoff and a Close() lifecycle method.

Changes

Cohort / File(s) Summary
Cache manager & public APIs
block/internal/cache/manager.go, block/internal/cache/pending_headers.go, block/internal/cache/pending_data.go
Extended PendingManager and related types with ResetInFlightHeaderRange, ResetInFlightDataRange, NumPendingHeadersTotal, and NumPendingDataTotal; added implementation wrappers and convenience reset.
Pending base (claims & gaps)
block/internal/cache/pending_base.go, block/internal/cache/pending_base_test.go
Reworked pending allocation to select contiguous unclaimed ranges, track inFlightClaims and gaps, added helpers for range ops, resetInFlightRange, numPendingTotal, and updated trimming when advancing last-submitted heights.
Cache tests & small test updates
block/internal/cache/manager_test.go, block/internal/cache/pending_headers_test.go, block/internal/cache/pending_data_test.go
Adjusted tests to reset/inspect in-flight ranges and assert pending counts immediately after resets; added new cases exercising claim/re-expose and partial-advance behaviors.
DA submitter core (async, retry, lifecycle)
block/internal/submitting/da_submitter.go, block/internal/submitting/submitter.go
Made DA submissions asynchronous with onSubmitSuccess/onSubmitError callbacks, introduced submitWithRetry with backoff and retry logic, switched batch limiting to byte-size, moved post-submit side effects to callbacks, reset in-flight ranges on failures, and added Close() to wait for in-flight goroutines.
Tracing & submitter API updates
block/internal/submitting/da_submitter_tracing.go, block/internal/submitting/submitter.go
Updated tracing wrapper and submitter API signatures to accept success/error callbacks and added Close() forwarding in the tracer.
Submitter tests & mocks
block/internal/submitting/da_submitter_test.go, block/internal/submitting/da_submitter_integration_test.go, block/internal/submitting/da_submitter_mocks_test.go, block/internal/submitting/da_submitter_tracing_test.go, block/internal/submitting/submitter_test.go
Reworked tests and mocks to the callback-based DA submitter API, added batch-retry edge-case tests, updated helpers to defer Close() and validate callback-driven outcomes.
Executor usage
block/internal/executing/executor.go
Switched ProduceBlock pending-limit gating to use total-pending counters (NumPendingHeadersTotal, NumPendingDataTotal) instead of non-total variants.

Sequence Diagrams

sequenceDiagram
    participant SubmitLoop as Submit loop
    participant Cache as Cache (pending manager)
    participant DASubmitter as DA Submitter
    participant DA as DA Layer

    SubmitLoop->>Cache: GetPendingHeaders()
    Cache->>Cache: choose contiguous unclaimed range\nregister in-flight claim
    Cache-->>SubmitLoop: return batch + (start,end)

    SubmitLoop->>DASubmitter: SubmitHeaders(batch, onSuccess, onError)
    DASubmitter-->>SubmitLoop: return nil (async)

    DASubmitter->>DASubmitter: spawn goroutine -> submitWithRetry
    DASubmitter->>DA: submit batch

    alt DA success
        DA-->>DASubmitter: success
        DASubmitter->>Cache: apply post-submit updates
        DASubmitter->>SubmitLoop: call onSuccess()
    else retryable failure
        DA-->>DASubmitter: error
        DASubmitter->>DASubmitter: backoff & retry
    else terminal error
        DA-->>DASubmitter: error
        DASubmitter->>SubmitLoop: call onError(error)
        SubmitLoop->>Cache: ResetInFlightHeaderRange(start,end)
    end
Loading
sequenceDiagram
    participant SubmitLoop as Submit loop
    participant Cache as pendingBase
    participant DASubmitter as DA Submitter

    SubmitLoop->>Cache: GetPendingData()
    Cache->>Cache: select first unclaimed contiguous range\nregister claim, remove overlapping gaps
    Cache-->>SubmitLoop: pending items + (start,end)

    SubmitLoop->>DASubmitter: SubmitData(batch, onSuccess, onError)
    DASubmitter-->>SubmitLoop: nil

    DASubmitter->>DASubmitter: submitWithRetry -> onSuccess/onError callbacks
    alt onError called
        SubmitLoop->>Cache: ResetInFlightDataRange(start,end)
        Cache->>Cache: remove claim, reinsert failing portion as gap
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • tac0turtle
  • MSevey

Poem

🐰 I hop through claims and gaps with glee,
Async submits set my paws free.
Retries hum soft when batches fly,
Reset a range, and try nearby.
Tiny rabbit cheers: back to the sky!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.40% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main refactoring focus on making the submitter concurrent, which is the core change across all modified files.
Description check ✅ Passed The description provides context about the goal (concurrent sends for higher throughput) and rationale (Fiber latency), matching the overview template requirement.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/speedup-submitter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 24, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedApr 29, 2026, 3:16 PM

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 24, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 24, 2026

Codecov Report

❌ Patch coverage is 62.98201% with 144 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.59%. Comparing base (05979c1) to head (1daa429).

Files with missing lines Patch % Lines
block/internal/submitting/da_submitter.go 73.00% 29 Missing and 15 partials ⚠️
block/internal/submitting/submitter.go 22.80% 34 Missing and 10 partials ⚠️
block/internal/cache/pending_base.go 69.71% 30 Missing and 13 partials ⚠️
block/internal/cache/manager.go 40.00% 6 Missing ⚠️
block/internal/cache/pending_data.go 40.00% 3 Missing ⚠️
block/internal/cache/pending_headers.go 50.00% 2 Missing ⚠️
block/internal/submitting/da_submitter_tracing.go 66.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3287      +/-   ##
==========================================
+ Coverage   60.52%   60.59%   +0.07%     
==========================================
  Files         126      126              
  Lines       13721    13910     +189     
==========================================
+ Hits         8304     8429     +125     
- Misses       4513     4559      +46     
- Partials      904      922      +18     
Flag Coverage Δ
combined 60.59% <62.98%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@julienrbrt julienrbrt marked this pull request as ready for review April 29, 2026 08:05
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
block/internal/submitting/da_submitter_tracing.go (1)

33-61: ⚠️ Potential issue | 🟠 Major

The span now ends before the async submission does.

inner.SubmitHeaders/SubmitData now return right after scheduling background work, so defer span.End() closes the span before retries/callbacks run. Async failures routed through onSubmitError never reach the span, and the recorded latency is only the enqueue time. Wrap the callbacks and end/annotate the span from the actual completion path instead.

Also applies to: 64-92

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_tracing.go` around lines 33 - 61, The
current tracedDASubmitter starts a span in SubmitHeaders (and similarly in
SubmitData) but defers span.End(), which closes the span before async work
completes; instead remove the immediate defer span.End() and wrap the
onSubmitSuccess and onSubmitError callbacks with closures that record
errors/status on the span and call span.End() when the async completion path
runs; i.e., in tracedDASubmitter.SubmitHeaders and SubmitData, create
wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, ""); onSubmitSuccess();
span.End() } and wrappedOnError := func(err error){ span.RecordError(err);
span.SetStatus(codes.Error, err.Error()); onSubmitError(err); span.End() } (or
equivalent wrappers matching original signatures) and pass those to
t.inner.SubmitHeaders/SubmitData so the span lifetime and annotations reflect
actual completion.
block/internal/cache/pending_base.go (1)

80-96: ⚠️ Potential issue | 🟠 Major

Take lastHeight and the in-flight ranges under one synchronization boundary.

getPending() reads lastHeight before cloning inFlightClaims/gaps, while setLastSubmittedHeight() updates lastHeight and trims those slices independently. With the new concurrent submitter, the lastHeight=old + claims already trimmed interleaving can make findAvailableRange() hand out heights that were just acknowledged, causing duplicate DA submissions.

As per coding guidelines "Be careful with concurrent access to shared state".

Also applies to: 175-189

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_base.go` around lines 80 - 96, getPending()
currently reads pb.lastHeight (pb.lastHeight.Load()) outside the pb.inFlightMu
critical section and then clones pb.inFlightClaims and pb.gaps, which allows an
interleaving with setLastSubmittedHeight() that trims those slices and updates
lastHeight causing findAvailableRange() to return already-acknowledged heights;
fix by moving the read of pb.lastHeight inside the same
pb.inFlightMu.Lock()/Unlock() block where you clone inFlightClaims and gaps so
lastHeight and the in-flight ranges are read atomically, and apply the same
locking discipline to setLastSubmittedHeight() (acquire pb.inFlightMu while
trimming inFlightClaims/gaps and updating pb.lastHeight) to prevent races when
findAvailableRange, getPending, and setLastSubmittedHeight interact.
🧹 Nitpick comments (3)
block/internal/cache/pending_data.go (1)

88-90: Document the new exported reset helper.

ResetInFlightDataRange is public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.

💡 Suggested fix
+// ResetInFlightDataRange clears the in-flight data claim state for heights in [start, end].
 func (pd *PendingData) ResetInFlightDataRange(start, end uint64) {

As per coding guidelines, Document exported types and functions in Go code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_data.go` around lines 88 - 90, Add a Go doc
comment for the exported method ResetInFlightDataRange on type PendingData:
describe what the method does, its parameters (start, end uint64) and any
important behavior or side-effects (it delegates to pd.base.resetInFlightRange
to reset the in-flight data range). Place the comment immediately above the func
declaration for ResetInFlightDataRange so it satisfies Go exported identifier
documentation guidelines.
block/internal/cache/pending_headers.go (1)

83-85: Document the new exported reset helper.

ResetInFlightHeaderRange is public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.

💡 Suggested fix
+// ResetInFlightHeaderRange clears the in-flight header claim state for heights in [start, end].
 func (ph *PendingHeaders) ResetInFlightHeaderRange(start, end uint64) {

As per coding guidelines, Document exported types and functions in Go code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_headers.go` around lines 83 - 85, Add a Go doc
comment for the exported method PendingHeaders.ResetInFlightHeaderRange
describing its purpose and behavior: explain that it resets the in-flight header
range tracked by the PendingHeaders instance between the inclusive start and end
uint64 indices, mention any side effects (it delegates to
ph.base.resetInFlightRange), and document the parameters (start, end) and any
expectations (e.g., inclusive bounds or preconditions). Place the comment
immediately above the ResetInFlightHeaderRange method.
block/internal/submitting/submitter_test.go (1)

424-440: Exercise the new callbacks in the fake submitter.

This test double now accepts onSubmitSuccess/onSubmitError, but it still discards them. That means the loop test only verifies enqueueing, not the success/error lifecycle that now updates timestamps and resets in-flight cache state.

Consider invoking the callbacks when non-nil or adding a focused test that covers that contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/submitter_test.go` around lines 424 - 440, The
fakeDASubmitter currently drops the provided callbacks in SubmitHeaders and
SubmitData; update these methods (SubmitHeaders and SubmitData on
fakeDASubmitter) to call the supplied on-success and on-error callbacks when
they are non-nil so the test exercises the full success/error lifecycle (e.g.,
invoke the success callback when you want the fake to simulate success, or
invoke the error callback with a test error to simulate failure), while
preserving the existing signaling to chHdr/chData; alternatively add a focused
test that uses a fake submitter which invokes those callbacks to assert
timestamps and in-flight cache resets.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/submitting/da_submitter_integration_test.go`:
- Around line 101-110: The test currently calls daSubmitter.Close() only at the
end, risking resource leaks if earlier assertions fail; after creating the async
submitter (the daSubmitter variable), ensure cleanup is guaranteed by
registering a deferred close or t.Cleanup call—e.g., immediately after
daSubmitter is constructed call defer daSubmitter.Close() or t.Cleanup(func(){
daSubmitter.Close() }) so the Close() method on daSubmitter always runs even if
the test fails early.

In `@block/internal/submitting/da_submitter_test.go`:
- Around line 216-218: The test currently calls submitter.Close() after
assertions which can leak the submitter's async worker if an assertion fails;
change the teardown to run immediately after setup by invoking defer
submitter.Close() (or t.Cleanup(func(){ submitter.Close() })) right after the
submitter is created so Close() always runs even on test failures — update the
tests that call submitter.SubmitHeaders(...) and later submitter.Close() (e.g.,
the cases around SubmitHeaders and the other similar test) to use deferred
cleanup instead.

In `@block/internal/submitting/da_submitter.go`:
- Around line 388-398: The datalayer success branch uses res.SubmittedCount
directly which can be 0 or >len(marshaled) and cause infinite loops or panics;
in the datypes.StatusSuccess case (around res.SubmittedCount handling) validate
that submitted := int(res.SubmittedCount) is >0 and <= len(marshaled) before
calling onSuccess or advancing the window (marshaled = marshaled[submitted:]);
if submitted==0 treat as a reject/error (update rs.Attempt or return/log and do
not spin) and if submitted>len(marshaled) treat as malformed input (log/error
and reject) so only a validated count is passed to onSuccess and used to slice
marshaled.

In `@block/internal/submitting/submitter.go`:
- Around line 236-250: The code enqueues a batch as in-flight via
GetPendingHeaders/GetPendingData but if s.daSubmitter.SubmitHeaders or
SubmitData returns an immediate error the in-flight claim is never released;
update the error path in submitter.go around s.daSubmitter.SubmitHeaders and the
analogous SubmitData call so that before logging or returning on synchronous
error you call s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (and for data use the corresponding
ResetInFlightDataRange with the first/last data heights), then proceed to
log/handle the error (including the existing ErrOversizedItem handling) so the
claimed heights are retried.

---

Outside diff comments:
In `@block/internal/cache/pending_base.go`:
- Around line 80-96: getPending() currently reads pb.lastHeight
(pb.lastHeight.Load()) outside the pb.inFlightMu critical section and then
clones pb.inFlightClaims and pb.gaps, which allows an interleaving with
setLastSubmittedHeight() that trims those slices and updates lastHeight causing
findAvailableRange() to return already-acknowledged heights; fix by moving the
read of pb.lastHeight inside the same pb.inFlightMu.Lock()/Unlock() block where
you clone inFlightClaims and gaps so lastHeight and the in-flight ranges are
read atomically, and apply the same locking discipline to
setLastSubmittedHeight() (acquire pb.inFlightMu while trimming
inFlightClaims/gaps and updating pb.lastHeight) to prevent races when
findAvailableRange, getPending, and setLastSubmittedHeight interact.

In `@block/internal/submitting/da_submitter_tracing.go`:
- Around line 33-61: The current tracedDASubmitter starts a span in
SubmitHeaders (and similarly in SubmitData) but defers span.End(), which closes
the span before async work completes; instead remove the immediate defer
span.End() and wrap the onSubmitSuccess and onSubmitError callbacks with
closures that record errors/status on the span and call span.End() when the
async completion path runs; i.e., in tracedDASubmitter.SubmitHeaders and
SubmitData, create wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, "");
onSubmitSuccess(); span.End() } and wrappedOnError := func(err error){
span.RecordError(err); span.SetStatus(codes.Error, err.Error());
onSubmitError(err); span.End() } (or equivalent wrappers matching original
signatures) and pass those to t.inner.SubmitHeaders/SubmitData so the span
lifetime and annotations reflect actual completion.

---

Nitpick comments:
In `@block/internal/cache/pending_data.go`:
- Around line 88-90: Add a Go doc comment for the exported method
ResetInFlightDataRange on type PendingData: describe what the method does, its
parameters (start, end uint64) and any important behavior or side-effects (it
delegates to pd.base.resetInFlightRange to reset the in-flight data range).
Place the comment immediately above the func declaration for
ResetInFlightDataRange so it satisfies Go exported identifier documentation
guidelines.

In `@block/internal/cache/pending_headers.go`:
- Around line 83-85: Add a Go doc comment for the exported method
PendingHeaders.ResetInFlightHeaderRange describing its purpose and behavior:
explain that it resets the in-flight header range tracked by the PendingHeaders
instance between the inclusive start and end uint64 indices, mention any side
effects (it delegates to ph.base.resetInFlightRange), and document the
parameters (start, end) and any expectations (e.g., inclusive bounds or
preconditions). Place the comment immediately above the ResetInFlightHeaderRange
method.

In `@block/internal/submitting/submitter_test.go`:
- Around line 424-440: The fakeDASubmitter currently drops the provided
callbacks in SubmitHeaders and SubmitData; update these methods (SubmitHeaders
and SubmitData on fakeDASubmitter) to call the supplied on-success and on-error
callbacks when they are non-nil so the test exercises the full success/error
lifecycle (e.g., invoke the success callback when you want the fake to simulate
success, or invoke the error callback with a test error to simulate failure),
while preserving the existing signaling to chHdr/chData; alternatively add a
focused test that uses a fake submitter which invokes those callbacks to assert
timestamps and in-flight cache resets.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 996c9001-1856-439f-b61e-aa2ea4669b4e

📥 Commits

Reviewing files that changed from the base of the PR and between 49ef5c9 and 31093e6.

📒 Files selected for processing (16)
  • block/internal/cache/manager.go
  • block/internal/cache/manager_test.go
  • block/internal/cache/pending_base.go
  • block/internal/cache/pending_base_test.go
  • block/internal/cache/pending_data.go
  • block/internal/cache/pending_data_test.go
  • block/internal/cache/pending_headers.go
  • block/internal/cache/pending_headers_test.go
  • block/internal/submitting/da_submitter.go
  • block/internal/submitting/da_submitter_integration_test.go
  • block/internal/submitting/da_submitter_mocks_test.go
  • block/internal/submitting/da_submitter_test.go
  • block/internal/submitting/da_submitter_tracing.go
  • block/internal/submitting/da_submitter_tracing_test.go
  • block/internal/submitting/submitter.go
  • block/internal/submitting/submitter_test.go

Comment thread block/internal/submitting/da_submitter_integration_test.go
Comment on lines +216 to +218
err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer, nil, nil)
require.NoError(t, err)
submitter.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Defer Close() in the success-path tests.

Both tests close the submitter only after the assertions. If one of those assertions fails, teardown is skipped and the async worker can leak into later tests. Use defer or t.Cleanup right after setup instead.

💡 Suggested fix
 submitter, st, cm, mockDA, gen := setupDASubmitterTest(t)
+defer submitter.Close()
@@
- submitter.Close()

Also applies to: 331-333

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_test.go` around lines 216 - 218, The
test currently calls submitter.Close() after assertions which can leak the
submitter's async worker if an assertion fails; change the teardown to run
immediately after setup by invoking defer submitter.Close() (or
t.Cleanup(func(){ submitter.Close() })) right after the submitter is created so
Close() always runs even on test failures — update the tests that call
submitter.SubmitHeaders(...) and later submitter.Close() (e.g., the cases around
SubmitHeaders and the other similar test) to use deferred cleanup instead.

Comment on lines +388 to +398
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Validate SubmittedCount before using it.

res.SubmittedCount comes from the DA client. A value of 0 leaves marshaled unchanged and rs.Attempt unmodified, so this loop spins forever; a value larger than the batch size panics here and in the outer headers[:submittedCount] / signedDataList[:submittedCount] callbacks. Reject out-of-range counts before calling onSuccess.

Suggested fix
 		case datypes.StatusSuccess:
 			submitted := int(res.SubmittedCount)
+			if submitted <= 0 || submitted > len(marshaled) {
+				err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
+				s.recordFailure(common.DASubmitterFailureReasonUnknown)
+				s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
+				if onError != nil {
+					onError(err)
+				}
+				return
+			}
 			if onSuccess != nil {
 				onSuccess(submitted, res.Height)
 			}
 			s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")

As per coding guidelines "Validate all inputs from external sources in Go code".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if submitted <= 0 || submitted > len(marshaled) {
err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
s.recordFailure(common.DASubmitterFailureReasonUnknown)
s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
if onError != nil {
onError(err)
}
return
}
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 388 - 398, The
datalayer success branch uses res.SubmittedCount directly which can be 0 or
>len(marshaled) and cause infinite loops or panics; in the datypes.StatusSuccess
case (around res.SubmittedCount handling) validate that submitted :=
int(res.SubmittedCount) is >0 and <= len(marshaled) before calling onSuccess or
advancing the window (marshaled = marshaled[submitted:]); if submitted==0 treat
as a reject/error (update rs.Attempt or return/log and do not spin) and if
submitted>len(marshaled) treat as malformed input (log/error and reject) so only
a validated count is passed to onSuccess and used to slice marshaled.

Comment thread block/internal/submitting/submitter.go Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
block/internal/submitting/da_submitter.go (1)

384-395: ⚠️ Potential issue | 🔴 Critical

Validate SubmittedCount before using it.

The res.SubmittedCount from the DA client is used without validation. If submitted == 0, the loop continues with unchanged marshaled and reset backoff, potentially causing an infinite loop. If submitted > len(marshaled), the slice operation at line 394 will panic.

🛡️ Proposed fix to validate SubmittedCount
 		case datypes.StatusSuccess:
 			submitted := int(res.SubmittedCount)
+			if submitted <= 0 || submitted > len(marshaled) {
+				s.recordFailure(common.DASubmitterFailureReasonUnknown)
+				err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
+				s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
+				if onError != nil {
+					onError(err)
+				}
+				return
+			}
 			if onSuccess != nil {
 				onSuccess(submitted, res.Height)
 			}

As per coding guidelines: "Validate all inputs from external sources in Go code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 384 - 395, The code
uses res.SubmittedCount directly which can be 0 or >len(marshaled); validate it
before slicing and advancing. In the datypes.StatusSuccess branch (symbols:
res.SubmittedCount, marshaled, onSuccess, rs.Next, reasonSuccess, pol) ensure
submitted := int(res.SubmittedCount) is checked: if submitted <= 0 treat as no
progress (do not reset backoff — call rs.Fail or return to avoid infinite loop),
if submitted > len(marshaled) cap it to len(marshaled) before calling onSuccess
and slicing; only advance marshaled and call rs.Next when a positive, bounded
submitted value was applied.
🧹 Nitpick comments (2)
block/internal/submitting/submitter.go (1)

249-260: Minor: Avoid logging when err is nil.

When onError is called with nil (on context cancellation or empty batch from submitWithRetry), this logs an error with no actual error. Consider guarding the log statement.

♻️ Proposed improvement
 						onError := func(err error) {
 							if len(headers) > 0 {
 								s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height())
 							}
 							if errors.Is(err, common.ErrOversizedItem) {
 								s.logger.Error().Err(err).
 									Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock")
 								s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err))
 								return
 							}
-							s.logger.Error().Err(err).Msg("failed to submit headers")
+							if err != nil {
+								s.logger.Error().Err(err).Msg("failed to submit headers")
+							}
 						}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/submitter.go` around lines 249 - 260, The onError
closure should not log when err is nil: after calling
s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (if headers present), add an early guard if
err == nil { return } so you skip the error handling below; keep the
oversized-item check (errors.Is(err, common.ErrOversizedItem)),
s.logger.Error().Err(err).Msg("failed to submit headers"), and
s.sendCriticalError(...) as-is for non-nil errors.
block/internal/submitting/da_submitter.go (1)

326-331: Consider documenting the onError(nil) contract.

Calling onError(nil) for empty batches and context cancellations signals completion without error, but the callback name suggests an error occurred. This works correctly with the caller's errors.Is(err, common.ErrOversizedItem) check, but the semantics could be clearer.

Consider renaming to onComplete(error) or documenting that nil indicates graceful termination without retry failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 326 - 331, The
callback parameter onError in da_submitter.go is used to signal both actual
errors and graceful completion by calling onError(nil) for empty
batches/cancellations; update the code to make this contract explicit by either
renaming the parameter from onError to onComplete (and update all call sites and
related tests) or by adding a clear doc comment on the onError
parameter/signature (and any public type that exposes it) stating that a nil
argument indicates graceful termination/no-retry, while a non-nil error
indicates a retryable/failure condition; ensure you update comments and any
place that inspects the value (e.g., callers that use errors.Is(err,
common.ErrOversizedItem)) to reflect the new name or documented behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@block/internal/submitting/da_submitter.go`:
- Around line 384-395: The code uses res.SubmittedCount directly which can be 0
or >len(marshaled); validate it before slicing and advancing. In the
datypes.StatusSuccess branch (symbols: res.SubmittedCount, marshaled, onSuccess,
rs.Next, reasonSuccess, pol) ensure submitted := int(res.SubmittedCount) is
checked: if submitted <= 0 treat as no progress (do not reset backoff — call
rs.Fail or return to avoid infinite loop), if submitted > len(marshaled) cap it
to len(marshaled) before calling onSuccess and slicing; only advance marshaled
and call rs.Next when a positive, bounded submitted value was applied.

---

Nitpick comments:
In `@block/internal/submitting/da_submitter.go`:
- Around line 326-331: The callback parameter onError in da_submitter.go is used
to signal both actual errors and graceful completion by calling onError(nil) for
empty batches/cancellations; update the code to make this contract explicit by
either renaming the parameter from onError to onComplete (and update all call
sites and related tests) or by adding a clear doc comment on the onError
parameter/signature (and any public type that exposes it) stating that a nil
argument indicates graceful termination/no-retry, while a non-nil error
indicates a retryable/failure condition; ensure you update comments and any
place that inspects the value (e.g., callers that use errors.Is(err,
common.ErrOversizedItem)) to reflect the new name or documented behavior.

In `@block/internal/submitting/submitter.go`:
- Around line 249-260: The onError closure should not log when err is nil: after
calling s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (if headers present), add an early guard if
err == nil { return } so you skip the error handling below; keep the
oversized-item check (errors.Is(err, common.ErrOversizedItem)),
s.logger.Error().Err(err).Msg("failed to submit headers"), and
s.sendCriticalError(...) as-is for non-nil errors.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bbad2e4d-1de5-4835-bdcd-e5d352bbf063

📥 Commits

Reviewing files that changed from the base of the PR and between 31093e6 and da7df02.

📒 Files selected for processing (3)
  • block/internal/cache/pending_base.go
  • block/internal/submitting/da_submitter.go
  • block/internal/submitting/submitter.go
✅ Files skipped from review due to trivial changes (1)
  • block/internal/cache/pending_base.go

@julienrbrt julienrbrt force-pushed the julien/speedup-submitter branch from 730acad to 9669b26 Compare April 29, 2026 13:53
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
block/internal/submitting/submitter.go (1)

153-168: ⚠️ Potential issue | 🟠 Major

Wait for submitter workers to stop producing before closing the async DA submitter.

Stop() calls s.daSubmitter.Close() before waiting on s.wg, but the goroutines tracked by s.wg can still reach SubmitHeaders / SubmitData after Close() has started. That lets new DA goroutines escape the wait and survive shutdown. Reorder this so cancellation stops the producers first, then wait for s.wg, then Close() the DA submitter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/submitter.go` around lines 153 - 168, In
Submitter.Stop(), currently s.daSubmitter.Close() is called before waiting for
s.wg which allows goroutines to call SubmitHeaders/SubmitData after Close()
starts; change the order so you first call s.cancel() (if non-nil) to stop
producers, then wait for s.wg (with the existing timeout/done pattern), and only
after the wait completes or times out call s.daSubmitter.Close(); ensure
Submitter.Stop retains the timeout warning path and uses the same s.wg,
s.cancel, and s.daSubmitter.Close() symbols.
block/internal/submitting/da_submitter_tracing.go (1)

33-61: ⚠️ Potential issue | 🟠 Major

These spans no longer cover the actual DA submission.

The wrapped calls now return after enqueueing work, so defer span.End() records only enqueue latency. It also never sees async failures, and the current onSubmitSuccess callback is not a terminal signal because it can fire on partial successes. If you want end-to-end submission traces, this wrapper needs a real completion callback (or separate enqueue-vs-submit spans) instead of ending the span here.

Also applies to: 64-97

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_tracing.go` around lines 33 - 61, The
current tracedDASubmitter.SubmitHeaders creates a span and defers span.End(),
but the inner SubmitHeaders only enqueues work so the span ends too early and
misses async failures; change the wrapper to either (a) create two spans (an
immediate "enqueue" span that ends on return and a separate "submission" span
that is ended by a real completion callback), or (b) wrap the provided
onSubmitSuccess and onSubmitError with a new finalizing callback that records
errors, sets span status, and ends the span when the overall submission is truly
finished; apply the same pattern to the other traced methods mentioned (lines
64-97) so spans cover end-to-end submission rather than just enqueue latency,
and reference tracedDASubmitter.SubmitHeaders, the inner SubmitHeaders call, and
the onSubmitSuccess/onSubmitError callbacks when implementing the change.
♻️ Duplicate comments (2)
block/internal/submitting/da_submitter.go (1)

383-395: ⚠️ Potential issue | 🔴 Critical

Validate SubmittedCount before using it.

SubmittedCount comes from the DA client, but this branch uses it to drive both callbacks and slicing without checking bounds. 0 falsely reports success without advancing the window, and a value greater than len(marshaled) will panic here and in the post-submit callbacks.

Suggested fix
 	case datypes.StatusSuccess:
 		submitted := int(res.SubmittedCount)
+		if submitted <= 0 || submitted > len(marshaled) {
+			err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
+			s.recordFailure(common.DASubmitterFailureReasonUnknown)
+			s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
+			if onError != nil {
+				onError(err)
+			}
+			return
+		}
 		if onSuccess != nil {
 			onSuccess(submitted, res.Height)
 		}

As per coding guidelines "Validate all inputs from external sources in Go code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 383 - 395, Validate
res.SubmittedCount from the DA client before using it: ensure it's within [0,
len(marshaled)] and handle the zero-case by advancing the window (call
rs.Next(reasonSuccess, pol)) instead of treating it as full success; compute a
safe int value (e.g., capSubmitted := max(0, min(int(res.SubmittedCount),
len(marshaled)))) then call onSuccess(capSubmitted, res.Height) and log using
capSubmitted, slice marshaled = marshaled[capSubmitted:] only when capSubmitted
> 0, and log or error when res.SubmittedCount is out-of-range to avoid panics in
this block that contains res.SubmittedCount, onSuccess, marshaled, rs.Next,
reasonSuccess, pol, and s.logger.Info().
block/internal/submitting/da_submitter_test.go (1)

216-218: ⚠️ Potential issue | 🟡 Minor

Register Close() with t.Cleanup right after setup.

If one of the assertions fails before these lines, the async worker is never joined and can bleed into later tests. Move teardown next to setupDASubmitterTest.

Suggested fix
 submitter, st, cm, mockDA, gen := setupDASubmitterTest(t)
+t.Cleanup(submitter.Close)
 ...
- submitter.Close()

Also applies to: 331-333

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_test.go` around lines 216 - 218,
Register the submitter.Close cleanup immediately after setupDASubmitterTest
returns the submitter so the async worker is always joined even if an assertion
fails; i.e., right after obtaining the submitter variable in the test, call
t.Cleanup(func(){ submitter.Close() }) (and remove or leave any later explicit
submitter.Close() calls as desired) to ensure teardown; apply the same change to
the other test case that creates a submitter (the block that currently calls
submitter.Close() later).
🧹 Nitpick comments (1)
block/internal/cache/pending_data.go (1)

84-95: Add doc comments for the new exported PendingData methods.

NumPendingDataTotal and ResetInFlightDataRange are exported additions, so they should be documented like the other public cache APIs.

As per coding guidelines "Document exported types and functions in Go code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_data.go` around lines 84 - 95, Add Go doc
comments for the exported PendingData methods: NumPendingDataTotal,
SetLastSubmittedDataHeight, and ResetInFlightDataRange. For NumPendingDataTotal
and ResetInFlightDataRange add brief one-line comments describing what they
return/affect (e.g., number of pending data entries and that
ResetInFlightDataRange clears the in-flight range between start and end), and
for SetLastSubmittedDataHeight document the parameter and its effect on
PendingData state; follow the style and phrasing used by other public cache APIs
in the package.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/submitting/da_submitter.go`:
- Around line 227-235: The success callback passed into submitWithRetry (in the
s.wg.Go closures that call postSubmit and onSubmitSuccess) incorrectly always
slices from the start (headers[:submittedCount] /
signedDataList[:submittedCount]) and thus re-processes already-handled items on
partial successes; fix this by introducing and capturing a local cumulative
offset (e.g., submittedOffset) alongside the closure, have the callback use
headers[submittedOffset:submittedOffset+submittedCount] (and signedDataList
likewise), call postSubmit with that slice, then increment submittedOffset +=
submittedCount inside the callback so subsequent partial successes advance
correctly; apply the same change to the other closure that handles
signedDataList (the block around submitWithRetry at the second occurrence).

---

Outside diff comments:
In `@block/internal/submitting/da_submitter_tracing.go`:
- Around line 33-61: The current tracedDASubmitter.SubmitHeaders creates a span
and defers span.End(), but the inner SubmitHeaders only enqueues work so the
span ends too early and misses async failures; change the wrapper to either (a)
create two spans (an immediate "enqueue" span that ends on return and a separate
"submission" span that is ended by a real completion callback), or (b) wrap the
provided onSubmitSuccess and onSubmitError with a new finalizing callback that
records errors, sets span status, and ends the span when the overall submission
is truly finished; apply the same pattern to the other traced methods mentioned
(lines 64-97) so spans cover end-to-end submission rather than just enqueue
latency, and reference tracedDASubmitter.SubmitHeaders, the inner SubmitHeaders
call, and the onSubmitSuccess/onSubmitError callbacks when implementing the
change.

In `@block/internal/submitting/submitter.go`:
- Around line 153-168: In Submitter.Stop(), currently s.daSubmitter.Close() is
called before waiting for s.wg which allows goroutines to call
SubmitHeaders/SubmitData after Close() starts; change the order so you first
call s.cancel() (if non-nil) to stop producers, then wait for s.wg (with the
existing timeout/done pattern), and only after the wait completes or times out
call s.daSubmitter.Close(); ensure Submitter.Stop retains the timeout warning
path and uses the same s.wg, s.cancel, and s.daSubmitter.Close() symbols.

---

Duplicate comments:
In `@block/internal/submitting/da_submitter_test.go`:
- Around line 216-218: Register the submitter.Close cleanup immediately after
setupDASubmitterTest returns the submitter so the async worker is always joined
even if an assertion fails; i.e., right after obtaining the submitter variable
in the test, call t.Cleanup(func(){ submitter.Close() }) (and remove or leave
any later explicit submitter.Close() calls as desired) to ensure teardown; apply
the same change to the other test case that creates a submitter (the block that
currently calls submitter.Close() later).

In `@block/internal/submitting/da_submitter.go`:
- Around line 383-395: Validate res.SubmittedCount from the DA client before
using it: ensure it's within [0, len(marshaled)] and handle the zero-case by
advancing the window (call rs.Next(reasonSuccess, pol)) instead of treating it
as full success; compute a safe int value (e.g., capSubmitted := max(0,
min(int(res.SubmittedCount), len(marshaled)))) then call onSuccess(capSubmitted,
res.Height) and log using capSubmitted, slice marshaled =
marshaled[capSubmitted:] only when capSubmitted > 0, and log or error when
res.SubmittedCount is out-of-range to avoid panics in this block that contains
res.SubmittedCount, onSuccess, marshaled, rs.Next, reasonSuccess, pol, and
s.logger.Info().

---

Nitpick comments:
In `@block/internal/cache/pending_data.go`:
- Around line 84-95: Add Go doc comments for the exported PendingData methods:
NumPendingDataTotal, SetLastSubmittedDataHeight, and ResetInFlightDataRange. For
NumPendingDataTotal and ResetInFlightDataRange add brief one-line comments
describing what they return/affect (e.g., number of pending data entries and
that ResetInFlightDataRange clears the in-flight range between start and end),
and for SetLastSubmittedDataHeight document the parameter and its effect on
PendingData state; follow the style and phrasing used by other public cache APIs
in the package.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 75185388-8242-4eb1-8f32-3f7b1a2c29f7

📥 Commits

Reviewing files that changed from the base of the PR and between bb4ec71 and 9669b26.

⛔ Files ignored due to path filters (1)
  • execution/grpc/go.sum is excluded by !**/*.sum
📒 Files selected for processing (17)
  • block/internal/cache/manager.go
  • block/internal/cache/manager_test.go
  • block/internal/cache/pending_base.go
  • block/internal/cache/pending_base_test.go
  • block/internal/cache/pending_data.go
  • block/internal/cache/pending_data_test.go
  • block/internal/cache/pending_headers.go
  • block/internal/cache/pending_headers_test.go
  • block/internal/executing/executor.go
  • block/internal/submitting/da_submitter.go
  • block/internal/submitting/da_submitter_integration_test.go
  • block/internal/submitting/da_submitter_mocks_test.go
  • block/internal/submitting/da_submitter_test.go
  • block/internal/submitting/da_submitter_tracing.go
  • block/internal/submitting/da_submitter_tracing_test.go
  • block/internal/submitting/submitter.go
  • block/internal/submitting/submitter_test.go
✅ Files skipped from review due to trivial changes (1)
  • block/internal/cache/pending_base_test.go
🚧 Files skipped from review as they are similar to previous changes (8)
  • block/internal/cache/manager_test.go
  • block/internal/cache/pending_headers.go
  • block/internal/submitting/da_submitter_integration_test.go
  • block/internal/cache/pending_headers_test.go
  • block/internal/executing/executor.go
  • block/internal/submitting/da_submitter_mocks_test.go
  • block/internal/cache/pending_base.go
  • block/internal/submitting/da_submitter_tracing_test.go

Comment on lines +227 to +235
s.wg.Go(func() {
s.submitWithRetry(ctx, envelopes, namespace, func(submittedCount int, daHeight uint64) {
if submittedCount > 0 {
postSubmit(headers[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
}
if onSubmitSuccess != nil {
onSubmitSuccess()
}
}, onSubmitError, "header")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Track the submitted offset across partial-success callbacks.

submitWithRetry can invoke the success callback multiple times for one batch. These closures always use headers[:submittedCount] / signedDataList[:submittedCount], so the second partial success re-applies post-submit work to the beginning of the original batch instead of the remaining suffix. That will mark the wrong heights as included and advance the wrong last-submitted height whenever the DA layer accepts only part of a batch.

Suggested fix
 	postSubmit := s.makeHeaderPostSubmit(ctx, cache)
 	namespace := s.client.GetHeaderNamespace()
+	submittedOffset := 0

 	s.wg.Go(func() {
 		s.submitWithRetry(ctx, envelopes, namespace, func(submittedCount int, daHeight uint64) {
 			if submittedCount > 0 {
-				postSubmit(headers[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
+				end := submittedOffset + submittedCount
+				postSubmit(headers[submittedOffset:end], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
+				submittedOffset = end
 			}
 			if onSubmitSuccess != nil {
 				onSubmitSuccess()
@@
 	postSubmit := s.makeDataPostSubmit(ctx, cache)
 	namespace := s.client.GetDataNamespace()
+	submittedOffset := 0

 	s.wg.Go(func() {
 		s.submitWithRetry(ctx, signedDataListBz, namespace, func(submittedCount int, daHeight uint64) {
 			if submittedCount > 0 {
-				postSubmit(signedDataList[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
+				end := submittedOffset + submittedCount
+				postSubmit(signedDataList[submittedOffset:end], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}})
+				submittedOffset = end
 			}
 			if onSubmitSuccess != nil {
 				onSubmitSuccess()

Also applies to: 284-292

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 227 - 235, The
success callback passed into submitWithRetry (in the s.wg.Go closures that call
postSubmit and onSubmitSuccess) incorrectly always slices from the start
(headers[:submittedCount] / signedDataList[:submittedCount]) and thus
re-processes already-handled items on partial successes; fix this by introducing
and capturing a local cumulative offset (e.g., submittedOffset) alongside the
closure, have the callback use
headers[submittedOffset:submittedOffset+submittedCount] (and signedDataList
likewise), call postSubmit with that slice, then increment submittedOffset +=
submittedCount inside the callback so subsequent partial successes advance
correctly; apply the same change to the other closure that handles
signedDataList (the block around submitWithRetry at the second occurrence).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant